Fetch est une bibliothèque Scala pour organiser l'accès aux données des systèmes de fichiers, des bases de données, des services Web et de toute autre source, dont les données peuvent être obtenues par un identifiant unique. La bibliothèque est écrite dans un style fonctionnel et est basée sur Cats and Cats Effect. Conçu pour la composition et l'optimisation des requêtes vers différentes sources de données. Il vous permet de:
- demander des données à plusieurs sources en parallèle;
- demander des données à une source en parallèle;
- combiner les demandes à une source en une seule demande;
- dédupliquez les requêtes dans chacune des situations répertoriées;
- cache les résultats de la requête.
Pour ce faire, la bibliothèque fournit des outils qui vous permettent d'écrire du code métier propre sans constructions de bas niveau pour implémenter les optimisations répertoriées.
Les exemples utilisent la dernière version de Fetch au moment de la rédaction de cet article - 1.3.0.
Source de données dans Fetch
Pour implémenter l'accès à n'importe quelle source via Fetch, vous devez implémenter:
- description de la source de données (trait
Data[I, A]
); - méthodes pour obtenir des données de la source (trait
DataSource[F[_], I, A]
DataSource[F[_], I, A]
I
A
F
— ) :
* A `DataSource` is the recipe for fetching a certain identity `I`, which yields
* results of type `A` performing an effect of type `F[_]`.
trait DataSource[F[_], I, A] {
def data: Data[I, A]
implicit def CF: Concurrent[F]
/** Fetch one identity, returning a None if it wasn't found.
def fetch(id: I): F[Option[A]]
/** Fetch many identities, returning a mapping from identities to results. If an
* identity wasn't found, it won't appear in the keys.
def batch(ids: NonEmptyList[I]): F[Map[I, A]] =
ids.map(id => fetch(id).tupleLeft(id))
).map(_.collect { case (id, Some(x)) => id -> x }.toMap)
def maxBatchSize: Option[Int] = None
def batchExecution: BatchExecution = InParallel
data: Data[I, A]
— Data[I,A]
CF
Concurrent
fetch
ID. batch
— , ID . fetch
— ID . : , , .
class ListData(val list: List[String]) extends Data[Int, String] {
override def name: String = "My List of Data"
class ListDataSource(list: ListData)(implicit cs: ContextShift[IO])
extends DataSource[IO, Int, String]
with LazyLogging {
override def data: ListData = list
/*implicit Stack overflow, */
override def CF: Concurrent[IO] = Concurrent[IO]
override def fetch(id: Int): IO[Option[String]] =
CF.delay {
logger.info(s"Processing element from index $id")
DataSource
Data
. :
class ListSource(list: List[String])(implicit cf: ContextShift[IO]) extends Data[Int, String] with LazyLogging {
override def name: String = "My List of Data"
private def instance: ListSource = this
def source = new DataSource[IO, Int, String] {
override def data: Data[Int, String] = instance
override def CF: Concurrent[IO] = Concurrent[IO]
override def fetch(id: Int): IO[Option[String]] =
CF.delay {
logger.info(s"Processing element from index $id")
Fetch
. Fetch
. — - "" . . Fetch
( run , ). ID . , F
, Concurrent[F]
. :
val list = List("a", "b", "c")
val data: ListSource = new ListSource(list)
val source: DataSource[IO, Int, String] = data.source
val fetchDataPlan: Fetch[IO, String] = Fetch(1, source)
val fetchData: IO[String] = Fetch.run(fetchDataPlan)
val dataCalculated: String = fetchData.unsafeRunSync // b
ID
object Example extends App {
implicit val ec: ExecutionContext = global
implicit val cs: ContextShift[IO] = IO.contextShift(ec) // Fetch.run ListDataSource
implicit val timer: Timer[IO] = IO.timer(ec) // Fetch.run
val list = List("a", "b", "c")
val data = new ListSource(list)
val source = data.source
Fetch.run(Fetch(0, source)).unsafeRunSync
// INFO ListDataSource - Processing element from index 0
Fetch.run(Fetch(1, source)).unsafeRunSync
// INFO ListDataSource - Processing element from index 1
Fetch.run(Fetch(2, source)).unsafeRunSync
// INFO ListDataSource - Processing element from index 2
Fetch.run(Fetch(3, source)).unsafeRunSync
// INFO ListDataSource - Processing element from index 3
// Exception in thread "main" fetch.package$MissingIdentity
data.list.lift(id)
fetch
Option
Option : DataSource[F[_], I, A]
Option , Fetch
optional
Fetch.run(Fetch.optional(3, source)).unsafeRunSync // None
, :
val fApply: Fetch[IO, String] = Fetch(3, source)
val fOptional: Fetch[IO, Option[String]] = Fetch.optional(3, source)
Data
Fetch
optional
def fetchElem(id: Int) = Fetch.optional(id, source)
Fetch.run(data.fetchElem(0)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 0
Fetch.run(data.fetchElem(1)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 1
Fetch.run(data.fetchElem(2)).unsafeRunSync // INFO app.ListDataSource - Processing element from index 2
println(Fetch.run(data.fetchElem(2)).unsafeRunSync) // Some(c)
println(Fetch.run(data.fetchElem(3)).unsafeRunSync) // None
Fetch « »:
def fetch(id: Int): Option[String] = {
val run = Fetch.run(data.fetchElem(id))
fetch(1) // INFO app.ListDataSource - Processing element from index 1
fetch(1) // INFO app.ListDataSource - Processing element from index 1
fetch(1) // INFO app.ListDataSource - Processing element from index 1
Fetch DataCache[F[_]]
InMemoryCache[F[_]: Monad](state: Map[(Data[Any, Any], DataSourceId), DataSourceResult])
from
empty
— :
def from[F[_]: Monad, I, A](results: ((Data[I, A], I), A)*): InMemoryCache[F]
def empty[F[_]: Monad]: InMemoryCache[F]
Map[(Data[Any, Any], DataSourceId), DataSourceResult]
. :
final class DataSourceId(val id: Any) extends AnyVal
final class DataSourceResult(val result: Any) extends AnyVal
(Data[Any, Any], DataSourceId)
ID . — DataSourceResult
Any
. . InMemoryCache
def lookup[I, A](i: I, data: Data[I, A]): F[Option[A]] =
.get((data.asInstanceOf[Data[Any, Any]], new DataSourceId(i)))
Data
Data
asInstanceOf[A]
Map updated
, .
, , Map Scala — . - .
val cacheF: DataCache[IO] = InMemoryCache.from((data, 1) -> "b", (data, 2) -> "c")
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync //
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync //
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(1), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(0), cacheF).unsafeRunSync
Fetch.run(data.fetchElem(0), cacheF).unsafeRunSync
// INFO app.ListDataSource - Processing element from index 0
// INFO app.ListDataSource - Processing element from index 0
Fetch.runCache
, ( , )
var cache: DataCache[IO] = InMemoryCache.empty
def cachedRun(id: Int): Option[String] = {
val (c, r) = Fetch.runCache(Fetch.optional(id, source), cache).unsafeRunSync
cache = c //
// INFO app.ListDataSource - Processing element from index 1
// INFO app.ListDataSource - Processing element from index 2
// INFO app.ListDataSource - Processing element from index 4
// INFO app.ListDataSource - Processing element from index 4
DataCache
: Caffeine
. Fetch. DataCache
Java- Caffeine, — Scala Scaffeine:
class ScaffeineCache extends DataCache[IO] with LazyLogging {
private val cache =
.build[(Data[Any, Any], Any), Any]()
override def lookup[I, A](i: I, data: Data[I, A]): IO[Option[A]] = IO {
.getIfPresent(data.asInstanceOf[Data[Any, Any]] -> i)
.map { any =>
val correct = any.asInstanceOf[A]
logger.info(s"From cache: $i")
override def insert[I, A](i: I, v: A, data: Data[I, A]): IO[DataCache[IO]] = {
cache.put(data.asInstanceOf[Data[Any, Any]] -> i, v) // Unit
InMemoryCache
Scaffeine — Any
build[(Data[Any, Any], Any), Any]()
asInstanceOf
val list = List("a", "b", "c")
val listSource = new ListSource(list)
val source = listSource.source
val randomSource = new RandomSource()
val cache = new ScaffeineCache()
/** */
Fetch.run(Fetch(1, source)).unsafeRunSync // Processing element from index 1
Fetch.run(Fetch(1, source)).unsafeRunSync // Processing element from index 1
/** */
Fetch.run(Fetch(1, source), cache).unsafeRunSync // Processing element from index 1
Fetch.run(Fetch(1, source), cache).unsafeRunSync // From cache: 1
Fetch.run(Fetch("a", source), cache).unsafeRunSync // type mismatch
/** */
Fetch.run(randomSource.fetchInt(2), cache).unsafeRunSync // Getting next random by max 2
Fetch.run(randomSource.fetchInt(2), cache).unsafeRunSync // From cache: 2
ID ( asInstanceOf) type mismatch Fetch ID Source
- ;
Caffeine, — . , DataCache insert.
Fetch Scala Cats. — List[Fetch[_,_]]
Fetch[_, List[_]]
, Fetch.run
Fetch: , , , . Fetch , Fetch
— , .
. :
val t1: IO[(String, String)] = for {
a <- Fetch.run(Fetch(1, source))
b <- Fetch.run(Fetch(1, source))
} yield (a, b)
for : Fetch[_, List[_]]
Fetch
, . for :
val f1: Fetch[IO, (String, String)] = for {
a <- Fetch(1, source)
b <- Fetch(1, source)
} yield (a,b)
val t2: IO[(String, String)] = Fetch.run(f1)
Fetch . , Fetch.run
- ;
- — ;
- — .
Cats. , sequence List[F[_]]
. traverse : List[] `F[List[]]`. , tupled :
val f3: List[Fetch[IO, String]] = List(
Fetch(1, source),
Fetch(2, source),
Fetch(2, source)
val f31: Fetch[IO, List[String]] = f3.sequence
val t3: IO[List[String]] = Fetch.run(f31)
val f4: List[Int] = List(
val f41: Fetch[IO, List[String]] = f4.traverse(Fetch(_, source))
val t4: IO[List[String]] = Fetch.run(f41)
val f5: (Fetch[IO, String], Fetch[IO, String]) = (Fetch(1, source), Fetch(2, source))
val f51: Fetch[IO, (String, String)] = f5.tupled
val t5: IO[(String, String)] = Fetch.run(f51)
val f6: (Int, Int) = (1, 2)
val f61: Fetch[IO, (Int, String)] = f6.traverse(Fetch(_, source))
val f0: Fetch[IO, String] = Fetch(1, source).flatMap(_ => Fetch(1, source))
val t0: IO[String] = Fetch.run(f0)
Fetch
. Cats. Fetch — , . , , .
Fetch . :
import fetch.fetchM // Fetch Cats
val tuple: Fetch[IO, (Option[String], Option[String])] = (data.fetchElem(0), data.fetchElem(1)).tupled
Fetch.run(tuple).unsafeRunSync() // (Some(a),Some(b))
SQL-
override def batch(ids: NonEmptyList[Int]): IO[Map[Int, String]] = {
logger.info(s"IDs fetching in batch: $ids")
import fetch.fetchM
def findMany: Fetch[IO, List[Option[String]]] =
List(0, 1, 2, 3, 4, 5).traverse(data.fetchElem)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(0, 5, 1, 2, 3, 4)
maxBatchSize
override def maxBatchSize: Option[Int] = 2.some // defaults to None
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(0, 5)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(1, 2)
// INFO app.ListSource - IDs fetching in batch: NonEmptyList(3, 4)
batchExecution
override def batchExecution: BatchExecution = Sequentially // defaults to `InParallel`
Fetch
, , - . ID, . Fetch , :
class RandomSource(implicit cf: ContextShift[IO]) extends Data[Int, Int] with LazyLogging {
override def name: String = "Random numbers generator"
private def instance: RandomSource = this
def source: DataSource[IO, Int, Int] = new DataSource[IO, Int, Int] {
override def data: Data[Int, Int] = instance
override def CF: Concurrent[IO] = Concurrent[IO]
override def fetch(max: Int): IO[Option[Int]] =
CF.delay {
logger.info(s"Getting next random by max $max")
Fetch listSource
val listSource = new ListSource(List("a", "b", "c"))
val randomSource = new RandomSource()
def fetchMulti: Fetch[IO, (Int, String)] =
for {
rnd <- Fetch(3, randomSource.source) // Fetch[IO, Int]
char <- Fetch(rnd, listSource.source) // Fetch[IO, String]
} yield (rnd, char)
println(Fetch.run(fetchMulti).unsafeRunSync) // , (0,a)
for
sequence
sequence
tupled
. tupled
, . , .
, , . — ID , . .
val list = List("a", "b", "c", "d", "e", "f", "g", "h", "i")
val data = new ListSource(list, 2.some)
val tupleD: Fetch[IO, (Option[String], Option[String])] = (data.fetchElem(0), data.fetchElem(0)).tupled
//INFO app.sources.ListSource - Processing element from index 0
def fetchMultiD: Fetch[IO, (Int, String, Int, String)] =
for {
rnd1 <- Fetch(3, randomSource.source) // Fetch[IO, Int]
char1 <- Fetch(rnd1, listSource.source) // Fetch[IO, String]
rnd2 <- Fetch(3, randomSource.source) // Fetch[IO, Int]
char2 <- Fetch(rnd2, listSource.source) // Fetch[IO, String]
} yield (rnd1, char1, rnd2, char2)
//18:43:11.875 [scala-execution-context-global-14] INFO app.sources.RandomSource - Getting next random by max 3
//18:43:11.876 [scala-execution-context-global-13] INFO app.sources.ListSource - Processing element from index 1
Fetch
ID
ID; -
Either
, , Either
// val i: String = Fetch.run(Fetch(5, data.source)).unsafeRunSync // Exception in thread "main" fetch.package$MissingIdentity
val i: Either[Throwable, String] = Fetch.run(Fetch(5, data.source)).attempt.unsafeRunSync // Left(fetch.package$MissingIdentity)
Fetch Fetch.runLog
, FetchLog
. fetch-debug
, Throwable
// libraryDependencies += "com.47deg" %% "fetch-debug" % "1.3.0"
import fetch.debug.describe
val t: Either[Throwable, (Log, String)] = Fetch.runLog(Fetch(5, data.source)).attempt.unsafeRunSync
println(t.fold(describe, identity))
// [ERROR] Identity with id `5` for data source `My List of Data` not found, fetch interrupted after 1 rounds
// Fetch execution 0.00 seconds
// [Round 1] 0.00 seconds
// [Fetch one] From `My List of Data` with id 5 0.00 seconds
Fetch (>>
Cats flatMap(_ => ...))
object DebugExample extends App with ContextEntities {
val list = List("a", "b", "c", "d", "e", "f", "g", "h")
val listData = new ListSource(list)
val listSource: DataSource[IO, Int, String] = listData.source
val randomSource = new RandomSource().source
val cacheF: DataCache[IO] = InMemoryCache.from((listData, 1) -> "b")
val cached = Fetch(1, listSource)
// #1,
val notCached = Fetch(2, listSource)
// #2
val random = Fetch(10, randomSource)
// #3
val batched: Fetch[IO, (String, String)] = (Fetch(3, listSource), Fetch(4, listSource)).tupled
// #4
val combined = (Fetch(5, listSource), Fetch(150, randomSource)).tupled
/** End of fetches */
val complicatedFetch: Fetch[IO, (String, Int)] = cached >> notCached >> random >> notCached >> batched >> combined
val result: IO[(Log, (String, Int))] = Fetch.runLog(complicatedFetch, cacheF)
val tuple: (Log, (String, Int)) = result.unsafeRunSync()
println(tuple._2) // (f,17)
//Fetch execution 0.11 seconds
// [Round 1] 0.06 seconds
// [Fetch one] From `My List of Data` with id 2 0.06 seconds
// [Round 2] 0.00 seconds
// [Fetch one] From `Random numbers generator` with id 10 0.00 seconds
// [Round 3] 0.01 seconds
// [Batch] From `My List of Data` with ids List(3, 4) 0.01 seconds
// [Round 4] 0.00 seconds
// [Fetch one] From `Random numbers generator` with id 150 0.00 seconds
// [Fetch one] From `My List of Data` with id 5 0.00 seconds
raw:
// FetchLog(Queue(Round(List(Request(FetchOne(2,app.sources.ListSource@ea6147e),10767139,10767203))), Round(List(Request(FetchOne(10,app.sources.RandomSource@58b31054),10767211,10767213))), Round(List(Request(Batch(NonEmptyList(3, 4),app.sources.ListSource@ea6147e),10767234,10767242))), Round(List(Request(FetchOne(150,app.sources.RandomSource@58b31054),10767252,10767252), Request(FetchOne(5,app.sources.ListSource@ea6147e),10767252,10767252)))))
cached
notCached
batch
.
>>
3 4 tupled
, . — , . — , .
, , . , :
ID
- ;
- ID;
- .
( ):
type DocumentId = String
type PersonId = String
case class FtsResponse(ids: List[DocumentId])
case class SimilarityItem(id: DocumentId, similarity: Double)
case class DocumentInfo(id: DocumentId, info: String, authors: List[PersonId])
case class Person(id: PersonId, fullTitle: String)
case class DocumentSearchResponse(
items: List[DocumentSearchItem]
case class DocumentItem(id: DocumentId, info: Option[String], authors: List[Person])
case class DocumentSimilarItem(
item: DocumentItem,
similarity: Double
case class DocumentSearchItem(
item: DocumentItem,
similar: List[DocumentSimilarItem]
class DocumentSearchExample(
fts: Fts[IO],
documentInfoRepo: DocumentInfoRepo[IO],
vectorSearch: VectorSearch[IO],
personRepo: PersonRepo[IO]
implicit cs: ContextShift[IO]
) {
val infoSource = new DocumentInfoSource(documentInfoRepo, 16.some)
val personSource = new PersonSource(personRepo, 16.some)
val similarSource = new SimilarDocumentSource(vectorSearch, 16.some)
def documentItemFetch(id: DocumentId): Fetch[IO, DocumentItem] =
for {
infoOpt <- infoSource.fetchElem(id)
p <- infoOpt.traverse(i => i.authors.traverse(personSource.fetchElem).map(_.flatten))
} yield DocumentItem(id, infoOpt.map(_.info), p.getOrElse(List.empty[Person]))
def fetchSimilarItems(id: DocumentId): Fetch[IO, List[DocumentSimilarItem]] =
.flatMap {
_.traverse { si =>
documentItemFetch(si.id).map { di =>
DocumentSimilarItem(di, si.similarity)
def searchDocumentFetch(query: String): Fetch[IO, DocumentSearchResponse] =
for {
docs <- Fetch.liftF(fts.search(query))
items <- docs.ids.traverse { id =>
(documentItemFetch(id), fetchSimilarItems(id)).tupled.map(r => DocumentSearchItem(r._1, r._2))
} yield DocumentSearchResponse(items)
Fetch — DocumentInfoSource
PersonSource
, .
. , , Fetch. search
def search(query: String): F[FtsResponse]
Fetch[F, FtsResponse]
for
ID ID . Fetch tupled
DocumentSearchItem
fetchSimilarItems ID similarSource
documentItemFetch
DocumentInfo
ID
Fetch[IO, Fetch[...]]
map flatMap:
.map(_.getOrElse(List.empty[SimilarityItem])) // Fetch
.flatMap {
_.traverse { si =>
documentItemFetch(si.id).map { di => // Fetch
DocumentSimilarItem(di, si.similarity)
private val docInfo = Map(
"1" -> DocumentInfo("1", "Document 1", List(1)),
"2" -> DocumentInfo("2", "Document 2", List(1,2)),
"3" -> DocumentInfo("3", "Document 3", List(3,1)),
"4" -> DocumentInfo("4", "Document 4", List(2,1)),
"5" -> DocumentInfo("5", "Document 5", List(2)),
"6" -> DocumentInfo("6", "Document 6", List(1,3))
private val similars = Map(
"1" -> List(SimilarityItem("2", 0.7), SimilarityItem("3", 0.6)),
"2" -> List(SimilarityItem("1", 0.7)),
"3" -> List(SimilarityItem("1", 0.6)),
"4" -> List(),
"5" -> List(SimilarityItem("6", 0.5)),
"6" -> List(SimilarityItem("5", 0.5))
private val persons = Map(
1 -> Person(1, "Rick Deckard"),
2 -> Person(2, "Roy Batty"),
3 -> Person(3, "Joe")
, . , :
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 2. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 4. It is: None
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 3. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 1. It is: Some(List(2, 3))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 6. It is: Some(List(5))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 5. It is: Some(List(6))
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(4, 5, 2, 3, 6, 1)
INFO app.searchfetchproto.source.PersonSource - Person IDs fetching in batch: NonEmptyList(1, 2, 3)
batch
[Round 1] 0.12 seconds
[Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.06 seconds
[Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.12 seconds
[Round 2] 0.00 seconds
[Batch] From `Persons source` with ids List(1, 2, 3) 0.00 seconds
(documentItemFetch(id), fetchSimilarItems(id)).tupled
fetchSimilarItems
, , :
[Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.06 seconds
[Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.12 seconds
Thread.sleep(100)
), :
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(4, 5, 2, 3, 6, 1)
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 4. It is: None
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 5. It is: Some(List(6))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 3. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 2. It is: Some(List(1))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 6. It is: Some(List(5))
INFO app.searchfetchproto.source.SimilarDocumentSource - Fetching similar documents for ID: 1. It is: Some(List(2, 3))
INFO app.searchfetchproto.source.DocumentInfoSource - Document IDs fetching in batch: NonEmptyList(5, 2, 3, 6, 1)
[Round 1] 0.13 seconds
[Batch] From `Similar Document Source` with ids List(4, 5, 2, 3, 6, 1) 0.13 seconds
[Batch] From `Document Info Source` with ids List(4, 5, 2, 3, 6, 1) 0.08 seconds
[Round 2] 0.00 seconds
[Batch] From `Document Info Source` with ids List(5, 2, 3, 6, 1) 0.00 seconds
[Batch] From `Persons source` with ids List(1, 2, 3) 0.00 seconds
fetchSimilarItems
. , .
Fetch . , , , . Cats, Fetch Scala Doobie fs2.
- ZQuery — , , ZIO Cats;
- Clump — , Fetch, 2015;
- Haxl — Haskell.
ZQuery Fetch — “There is no Fork: an Abstraction for Efficient, Concurrent, and Concise Data Access” (https://simonmar.github.io/bib/papers/haxl-icfp14.pdf), ( ) .