wip-feat: adds download message queue logic
This commit is contained in:
19
src/Download/Action/Command/DownloadMediaCommand.php
Normal file
19
src/Download/Action/Command/DownloadMediaCommand.php
Normal file
@@ -0,0 +1,19 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Command;
|
||||
|
||||
use OneToMany\RichBundle\Contract\CommandInterface;
|
||||
|
||||
/**
|
||||
* @implements CommandInterface<DownloadMediaCommand>
|
||||
*/
|
||||
class DownloadMediaCommand implements CommandInterface
|
||||
{
|
||||
public function __construct(
|
||||
public string $url,
|
||||
public string $title,
|
||||
public string $filename,
|
||||
public string $mediaType,
|
||||
public string $imdbId,
|
||||
) {}
|
||||
}
|
||||
@@ -1,14 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Command;
|
||||
|
||||
use OneToMany\RichBundle\Contract\CommandInterface;
|
||||
|
||||
class GetDownloadOptionsCommand implements CommandInterface
|
||||
{
|
||||
/** @implements CommandInterface<GetDownloadOptionsCommand> */
|
||||
public function __construct(
|
||||
public string $tmdbId,
|
||||
public string $mediaType,
|
||||
) {}
|
||||
}
|
||||
58
src/Download/Action/Handler/DownloadMediaHandler.php
Normal file
58
src/Download/Action/Handler/DownloadMediaHandler.php
Normal file
@@ -0,0 +1,58 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Handler;
|
||||
|
||||
use App\Download\Action\Command\DownloadMediaCommand;
|
||||
use App\Download\Action\Result\DownloadMediaResult;
|
||||
use App\Download\Framework\Repository\DownloadRepository;
|
||||
use App\Download\Downloader\DownloaderInterface;
|
||||
use OneToMany\RichBundle\Contract\CommandInterface;
|
||||
use OneToMany\RichBundle\Contract\HandlerInterface;
|
||||
use OneToMany\RichBundle\Contract\ResultInterface;
|
||||
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
|
||||
use Symfony\Component\Messenger\Exception\UnrecoverableMessageHandlingException;
|
||||
|
||||
/** @implements HandlerInterface<DownloadMediaCommand, DownloadMediaResult> */
|
||||
#[AsMessageHandler]
|
||||
readonly class DownloadMediaHandler implements HandlerInterface
|
||||
{
|
||||
public function __construct(
|
||||
private DownloaderInterface $downloader,
|
||||
private DownloadRepository $downloadRepository,
|
||||
) {}
|
||||
|
||||
public function __invoke(CommandInterface $command)
|
||||
{
|
||||
$this->handle($command);
|
||||
}
|
||||
|
||||
public function handle(CommandInterface $command): ResultInterface
|
||||
{
|
||||
$download = $this->downloadRepository->insert(
|
||||
$command->url,
|
||||
$command->title,
|
||||
$command->filename,
|
||||
$command->imdbId,
|
||||
$command->mediaType,
|
||||
""
|
||||
);
|
||||
|
||||
try {
|
||||
$this->downloadRepository->updateStatus($download->getId(), 'In Progress');
|
||||
|
||||
$this->downloader->download(
|
||||
$command->mediaType,
|
||||
$command->title,
|
||||
$command->url,
|
||||
$download->getId()
|
||||
);
|
||||
|
||||
$this->downloadRepository->updateStatus($download->getId(), 'Complete');
|
||||
|
||||
} catch (\Throwable $exception) {
|
||||
throw new UnrecoverableMessageHandlingException($exception->getMessage(), 500);
|
||||
}
|
||||
|
||||
return new DownloadMediaResult(200, "Success.");
|
||||
}
|
||||
}
|
||||
@@ -1,22 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Handler;
|
||||
|
||||
use App\Tmdb\Tmdb;
|
||||
use App\Torrentio\Client\Torrentio;
|
||||
use OneToMany\RichBundle\Contract\CommandInterface;
|
||||
use OneToMany\RichBundle\Contract\HandlerInterface;
|
||||
use OneToMany\RichBundle\Contract\ResultInterface;
|
||||
|
||||
class GetDownloadOptionsHandler implements HandlerInterface
|
||||
{
|
||||
public function __construct(
|
||||
private readonly Tmdb $tmdb,
|
||||
private readonly Torrentio $torrentio,
|
||||
) {}
|
||||
|
||||
public function handle(CommandInterface $command): ResultInterface
|
||||
{
|
||||
$media = $this->tmdb->mediaDetails($command->tmdbId, $command->mediaType);
|
||||
}
|
||||
}
|
||||
40
src/Download/Action/Input/DownloadMediaInput.php
Normal file
40
src/Download/Action/Input/DownloadMediaInput.php
Normal file
@@ -0,0 +1,40 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Input;
|
||||
|
||||
use App\Download\Action\Command\DownloadMediaCommand;
|
||||
use OneToMany\RichBundle\Attribute\SourceRequest;
|
||||
use OneToMany\RichBundle\Contract\CommandInterface;
|
||||
use OneToMany\RichBundle\Contract\InputInterface;
|
||||
|
||||
/** @implements InputInterface<DownloadMediaInput> */
|
||||
class DownloadMediaInput implements InputInterface
|
||||
{
|
||||
public function __construct(
|
||||
#[SourceRequest('url')]
|
||||
public string $url,
|
||||
|
||||
#[SourceRequest('title')]
|
||||
public string $title,
|
||||
|
||||
#[SourceRequest('filename')]
|
||||
public string $filename,
|
||||
|
||||
#[SourceRequest('mediaType')]
|
||||
public string $mediaType,
|
||||
|
||||
#[SourceRequest('imdbId')]
|
||||
public string $imdbId,
|
||||
) {}
|
||||
|
||||
public function toCommand(): CommandInterface
|
||||
{
|
||||
return new DownloadMediaCommand(
|
||||
$this->url,
|
||||
$this->title,
|
||||
$this->filename,
|
||||
$this->mediaType,
|
||||
$this->imdbId,
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -1,24 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Input;
|
||||
|
||||
use App\Download\Action\Command\GetDownloadOptionsCommand;
|
||||
use OneToMany\RichBundle\Attribute\SourceRoute;
|
||||
use OneToMany\RichBundle\Contract\CommandInterface;
|
||||
use OneToMany\RichBundle\Contract\InputInterface;
|
||||
|
||||
class GetDownloadOptionsInput implements InputInterface
|
||||
{
|
||||
public function __construct(
|
||||
#[SourceRoute('tmdbId')]
|
||||
public string $tmdbId,
|
||||
|
||||
#[SourceRoute('mediaType')]
|
||||
public string $mediaType,
|
||||
) {}
|
||||
|
||||
public function toCommand(): CommandInterface
|
||||
{
|
||||
return new GetDownloadOptionsCommand($this->tmdbId, $this->mediaType);
|
||||
}
|
||||
}
|
||||
14
src/Download/Action/Result/DownloadMediaResult.php
Normal file
14
src/Download/Action/Result/DownloadMediaResult.php
Normal file
@@ -0,0 +1,14 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Result;
|
||||
|
||||
use OneToMany\RichBundle\Contract\ResultInterface;
|
||||
|
||||
/** @implements ResultInterface<DownloadMediaResult> */
|
||||
class DownloadMediaResult implements ResultInterface
|
||||
{
|
||||
public function __construct(
|
||||
public int $status,
|
||||
public string $message,
|
||||
) {}
|
||||
}
|
||||
@@ -1,12 +0,0 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Action\Result;
|
||||
|
||||
use App\Tmdb\TmdbResult;
|
||||
|
||||
class GetDownloadOptionsResult
|
||||
{
|
||||
public function __construct(
|
||||
public TmdbResult $media,
|
||||
) {}
|
||||
}
|
||||
20
src/Download/Downloader/DownloaderInterface.php
Normal file
20
src/Download/Downloader/DownloaderInterface.php
Normal file
@@ -0,0 +1,20 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Downloader;
|
||||
|
||||
|
||||
use App\Message\DownloadMessage;
|
||||
use App\Message\DownloadMovieMessage;
|
||||
use App\Message\DownloadTvShowMessage;
|
||||
|
||||
interface DownloaderInterface
|
||||
{
|
||||
/**
|
||||
* @param string $baseDir
|
||||
* @param string $title
|
||||
* @param string $url
|
||||
* @return void
|
||||
* Downloads the requested file.
|
||||
*/
|
||||
public function download(string $baseDir, string $title, string $url, ?int $downloadId): void;
|
||||
}
|
||||
64
src/Download/Downloader/ProcessDownloader.php
Normal file
64
src/Download/Downloader/ProcessDownloader.php
Normal file
@@ -0,0 +1,64 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Downloader;
|
||||
|
||||
use App\Download\Framework\Entity\Download;
|
||||
use Doctrine\ORM\EntityManagerInterface;
|
||||
use Symfony\Component\Process\Exception\ProcessFailedException;
|
||||
use Symfony\Component\Process\Process;
|
||||
|
||||
class ProcessDownloader implements DownloaderInterface
|
||||
{
|
||||
public function __construct(
|
||||
private EntityManagerInterface $entityManager,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
*/
|
||||
public function download(string $baseDir, string $title, string $url, ?int $downloadId): void
|
||||
{
|
||||
/** @var Download $downloadEntity */
|
||||
$downloadEntity = $this->entityManager->getRepository(Download::class)->find($downloadId);
|
||||
$downloadEntity->setProgress(0);
|
||||
$this->entityManager->flush();
|
||||
|
||||
$process = new Process([
|
||||
'/bin/sh',
|
||||
'/var/www/bash/app/wget_download.sh',
|
||||
$baseDir,
|
||||
$title,
|
||||
$url
|
||||
]);
|
||||
|
||||
$process->setTimeout(1800); // 30 min
|
||||
$process->setIdleTimeout(600); // 10 min
|
||||
|
||||
$process->start();
|
||||
|
||||
try {
|
||||
$progress = 0;
|
||||
$this->entityManager->flush();
|
||||
$process->wait(function ($type, $buffer) use ($progress, $downloadEntity): void {
|
||||
if (Process::ERR === $type) {
|
||||
$pregMatchOutput = [];
|
||||
preg_match('/[\d]+%/', $buffer, $pregMatchOutput);
|
||||
|
||||
if (!empty($pregMatchOutput)) {
|
||||
if ($pregMatchOutput[0] !== $progress) {
|
||||
$progress = (int) $pregMatchOutput[0];
|
||||
$downloadEntity->setProgress($progress);
|
||||
$this->entityManager->flush();
|
||||
}
|
||||
}
|
||||
}
|
||||
fwrite(STDOUT, $buffer);
|
||||
});
|
||||
$downloadEntity->setProgress(100);
|
||||
} catch (ProcessFailedException $exception) {
|
||||
$downloadEntity->setStatus('Failed');
|
||||
}
|
||||
|
||||
$this->entityManager->flush();
|
||||
}
|
||||
}
|
||||
27
src/Download/Downloader/WgetDownloader.php
Normal file
27
src/Download/Downloader/WgetDownloader.php
Normal file
@@ -0,0 +1,27 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Downloader;
|
||||
|
||||
use App\Message\DownloadMessage;
|
||||
use App\Message\DownloadMovieMessage;
|
||||
use App\Message\DownloadTvShowMessage;
|
||||
|
||||
class WgetDownloader implements DownloaderInterface
|
||||
{
|
||||
|
||||
/**
|
||||
* @inheritDoc
|
||||
* SSHs into the NAS and performs the download.
|
||||
* This way retains the fast DL speed on the NAS.
|
||||
*/
|
||||
public function download(string $baseDir, string $title, string $url, ?int $downloadId): void
|
||||
{
|
||||
// SSHs into the NAS, cds into movies dir, makes new dir based on filename, cds into that dir, downloads movie
|
||||
system(sprintf(
|
||||
'sh /var/www/bash/app/wget_download.sh "%s" "%s" "%s"',
|
||||
$baseDir,
|
||||
$title,
|
||||
$url
|
||||
));
|
||||
}
|
||||
}
|
||||
0
src/Download/Framework/Entity/.gitignore
vendored
Normal file
0
src/Download/Framework/Entity/.gitignore
vendored
Normal file
149
src/Download/Framework/Entity/Download.php
Normal file
149
src/Download/Framework/Entity/Download.php
Normal file
@@ -0,0 +1,149 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Framework\Entity;
|
||||
|
||||
use App\Repository\DownloadRepository;
|
||||
use Doctrine\ORM\Mapping as ORM;
|
||||
use Symfony\UX\Turbo\Attribute\Broadcast;
|
||||
|
||||
#[ORM\Entity(repositoryClass: DownloadRepository::class)]
|
||||
#[Broadcast]
|
||||
class Download
|
||||
{
|
||||
#[ORM\Id]
|
||||
#[ORM\GeneratedValue]
|
||||
#[ORM\Column]
|
||||
private ?int $id = null;
|
||||
|
||||
#[ORM\Column(length: 20, nullable: true)]
|
||||
private ?string $imdbId = null;
|
||||
|
||||
#[ORM\Column(length: 255, nullable: true)]
|
||||
private ?string $mediaType = null;
|
||||
|
||||
#[ORM\Column(length: 255, nullable: true)]
|
||||
private ?string $title = null;
|
||||
|
||||
#[ORM\Column(length: 1024)]
|
||||
private ?string $url = null;
|
||||
|
||||
#[ORM\Column(length: 1024, nullable: true)]
|
||||
private ?string $filename = null;
|
||||
|
||||
#[ORM\Column(length: 255, nullable: true)]
|
||||
private ?string $status = null;
|
||||
|
||||
#[ORM\Column(nullable: true)]
|
||||
private ?int $progress = null;
|
||||
|
||||
#[ORM\Column(length: 255, nullable: true)]
|
||||
private ?string $batchId = null;
|
||||
|
||||
public function getId(): ?int
|
||||
{
|
||||
return $this->id;
|
||||
}
|
||||
|
||||
public function setId(int $id): static
|
||||
{
|
||||
$this->id = $id;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getImdbId(): ?string
|
||||
{
|
||||
return $this->imdbId;
|
||||
}
|
||||
|
||||
public function setImdbId(?string $imdbId): static
|
||||
{
|
||||
$this->imdbId = $imdbId;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getMediaType(): ?string
|
||||
{
|
||||
return $this->mediaType;
|
||||
}
|
||||
|
||||
public function setMediaType(?string $mediaType): static
|
||||
{
|
||||
$this->mediaType = $mediaType;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getTitle(): ?string
|
||||
{
|
||||
return $this->title;
|
||||
}
|
||||
|
||||
public function setTitle(?string $title): static
|
||||
{
|
||||
$this->title = $title;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getUrl(): ?string
|
||||
{
|
||||
return $this->url;
|
||||
}
|
||||
|
||||
public function setUrl(string $url): static
|
||||
{
|
||||
$this->url = $url;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getFilename(): ?string
|
||||
{
|
||||
return $this->filename;
|
||||
}
|
||||
|
||||
public function setFilename(?string $filename): static
|
||||
{
|
||||
$this->filename = $filename;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getStatus(): ?string
|
||||
{
|
||||
return $this->status;
|
||||
}
|
||||
|
||||
public function setStatus(?string $status): static
|
||||
{
|
||||
$this->status = $status;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getProgress(): ?int
|
||||
{
|
||||
return $this->progress;
|
||||
}
|
||||
|
||||
public function setProgress(?int $progress): static
|
||||
{
|
||||
$this->progress = $progress;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
public function getBatchId(): ?string
|
||||
{
|
||||
return $this->batchId;
|
||||
}
|
||||
|
||||
public function setBatchId(?string $batchId): static
|
||||
{
|
||||
$this->batchId = $batchId;
|
||||
|
||||
return $this;
|
||||
}
|
||||
}
|
||||
0
src/Download/Framework/Repository/.gitignore
vendored
Normal file
0
src/Download/Framework/Repository/.gitignore
vendored
Normal file
118
src/Download/Framework/Repository/DownloadRepository.php
Normal file
118
src/Download/Framework/Repository/DownloadRepository.php
Normal file
@@ -0,0 +1,118 @@
|
||||
<?php
|
||||
|
||||
namespace App\Download\Framework\Repository;
|
||||
|
||||
use App\Download\Framework\Entity\Download;
|
||||
use App\ValueObject\DownloadRequest;
|
||||
use Doctrine\Bundle\DoctrineBundle\Repository\ServiceEntityRepository;
|
||||
use Doctrine\Persistence\ManagerRegistry;
|
||||
use Knp\Component\Pager\Paginator;
|
||||
use Knp\Component\Pager\PaginatorInterface;
|
||||
|
||||
/**
|
||||
* @extends ServiceEntityRepository<Download>
|
||||
*/
|
||||
class DownloadRepository extends ServiceEntityRepository
|
||||
{
|
||||
private ManagerRegistry $managerRegistry;
|
||||
|
||||
public function __construct(ManagerRegistry $registry, ManagerRegistry $managerRegistry)
|
||||
{
|
||||
parent::__construct($registry, Download::class);
|
||||
$this->managerRegistry = $managerRegistry;
|
||||
}
|
||||
|
||||
public function getCompletePaginated(int $pageNumber = 1, int $perPage = 10)
|
||||
{
|
||||
$firstResult = ($pageNumber - 1) * $perPage;
|
||||
$query = $this->createQueryBuilder('d')
|
||||
->andWhere('d.status IN (:statuses)')
|
||||
->orderBy('d.id', 'DESC')
|
||||
->setParameter('statuses', ['Complete'])
|
||||
->setFirstResult($firstResult)
|
||||
->setMaxResults($perPage)
|
||||
->getQuery();
|
||||
|
||||
return new \Doctrine\ORM\Tools\Pagination\Paginator($query);
|
||||
}
|
||||
|
||||
public function getActivePaginated(int $pageNumber = 1, int $perPage = 10)
|
||||
{
|
||||
$firstResult = ($pageNumber - 1) * $perPage;
|
||||
$query = $this->createQueryBuilder('d')
|
||||
->andWhere('d.status IN (:statuses)')
|
||||
->setParameter('statuses', ['New', 'In Progress'])
|
||||
->setFirstResult($firstResult)
|
||||
->setMaxResults($perPage)
|
||||
->getQuery();
|
||||
|
||||
return new \Doctrine\ORM\Tools\Pagination\Paginator($query);
|
||||
}
|
||||
|
||||
public function insert(
|
||||
string $url,
|
||||
string $title,
|
||||
string $filename,
|
||||
string $imdbId,
|
||||
string $mediaType,
|
||||
string $batchId,
|
||||
string $status = 'New'
|
||||
): Download {
|
||||
$download = (new Download())
|
||||
->setUrl($url)
|
||||
->setTitle($title)
|
||||
->setFilename($filename)
|
||||
->setImdbId($imdbId)
|
||||
->setMediaType($mediaType)
|
||||
->setBatchId($batchId)
|
||||
->setStatus($status);
|
||||
|
||||
$this->getEntityManager()->persist($download);
|
||||
$this->getEntityManager()->flush();
|
||||
|
||||
return $download;
|
||||
}
|
||||
|
||||
public function insertFromDownloadRequest(DownloadRequest $request): Download
|
||||
{
|
||||
$download = (new Download())
|
||||
->setUrl($request->downloadUrl)
|
||||
->setTitle($request->seriesName)
|
||||
->setFilename($request->filename)
|
||||
->setImdbId($request->imdbCode)
|
||||
->setMediaType($request->mediaType)
|
||||
->setStatus('New');
|
||||
|
||||
$this->getEntityManager()->persist($download);
|
||||
$this->getEntityManager()->flush();
|
||||
|
||||
return $download;
|
||||
}
|
||||
|
||||
public function updateStatus(int $id, string $status): Download
|
||||
{
|
||||
$download = $this->find($id);
|
||||
$download->setStatus($status);
|
||||
$this->getEntityManager()->flush();
|
||||
return $download;
|
||||
}
|
||||
|
||||
public function delete(int $id)
|
||||
{
|
||||
$entity = $this->find($id);
|
||||
$this->getEntityManager()->remove($entity);
|
||||
$this->getEntityManager()->flush();
|
||||
}
|
||||
|
||||
public function getPendingByBatchId(string $batchId): ?array
|
||||
{
|
||||
$query = $this->createQueryBuilder('d')
|
||||
->andWhere('d.status IN (:statuses)')
|
||||
->andWhere('d.batchId = :batchId')
|
||||
->setParameter('statuses', ['New', 'In Progress'])
|
||||
->setParameter('batchId', $batchId)
|
||||
->getQuery();
|
||||
|
||||
return $query->getResult();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user