Udara/Udara/SvcApp/Poll/Processor/Mode/CsvProcessor.cs
2026-03-19 11:53:15 +08:00

137 lines
4.3 KiB
C#

using Microsoft.EntityFrameworkCore;
using System.Text.Json;
using Udara.Common.Feature.Csv;
using Udara.Database.App;
using Udara.Database.App.Model;
using Udara.Infrastructure.FileSystem;
using Udara.Infrastructure.FileTransfer;
using Udara.SvcApp.Poll.Processor.Interface;
namespace Udara.SvcApp.Poll.Processor.Mode;
public class CsvResult : IDeviceResult
{
public DeviceListModel Device { get; set; } = default!;
public Dictionary<string, string> Parameters { get; set; } = new();
}
public class CsvProcessor : IDeviceProcessor
{
private readonly AppDbContext _db;
private readonly ILogger<CsvProcessor> _logger;
private readonly IProcessorResultCollector _collector;
private readonly IFileTransfer _fileTransfer;
private readonly IFileReader _fileReader;
private readonly ICsvParser _csvParser;
public CsvProcessor(
AppDbContext db,
ILogger<CsvProcessor> logger,
IProcessorResultCollector collector,
IFileTransfer fileTransfer,
IFileReader fileReader,
ICsvParser csvParser
)
{
_db = db;
_logger = logger;
_collector = collector;
_fileTransfer = fileTransfer;
_fileReader = fileReader;
_csvParser = csvParser;
}
public async Task ProcessAsync(DeviceListModel device, CancellationToken token)
{
var config = await _db.DeviceDatafile
.AsNoTracking()
.FirstOrDefaultAsync(x => x.DeviceId == device.Id, token);
if (config == null)
{
_logger.LogError("DeviceId {DeviceId}: No DeviceDatafile entry found", device.Id);
return;
}
var parameters = await _db.DeviceParameters
.AsNoTracking()
.Where(x => x.DeviceId == device.Id)
.OrderBy(x => x.ParameterId)
.ToListAsync(token);
if (parameters.Count == 0)
{
_logger.LogError("DeviceId {DeviceId}: No DeviceParameters entry found", device.Id);
return;
}
var csvData = parameters.ToDictionary(r => r.ParameterName, _ => "");
try
{
var collected = await _fileTransfer.CollectFilesAsync(
config.ProcessName,
config.TargetDir,
config.FilePattern,
config.FileExtension,
token);
foreach (var file in collected)
{
token.ThrowIfCancellationRequested();
var content = await _fileReader.ReadAllTextAsync(file, token);
if (string.IsNullOrEmpty(content))
continue;
var parsed = _csvParser.Parse(content);
if (!(parsed.Header.Length == 0 || parsed.Rows.Count == 0))
{
var fileData = parsed.Header.Zip(parsed.Rows[0], (k, v) => new { k, v }).ToDictionary(x => x.k, x => x.v);
var header = parsed.Header;
var firstRow = parsed.Rows[0];
foreach (var c in fileData)
if (csvData.ContainsKey(c.Key))
csvData[c.Key] = c.Value;
}
else
{
_logger.LogWarning("DeviceId {DeviceId}: CSV file {File} has no usable data", device.Id, file);
continue;
}
await _fileTransfer.ProcessedFilesAsync(file, token);
}
}
catch (TimeoutException)
{
_logger.LogError("DeviceId {DeviceId}: Timeout reading from AIO device", device.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "DeviceId {DeviceId}: Error processing CSV files", device.Id);
}
finally
{
var result = new CsvResult
{
Device = device,
Parameters = csvData
};
var json = JsonSerializer.Serialize(result, new JsonSerializerOptions
{
WriteIndented = true
});
_logger.LogInformation("=== DeviceId {DeviceId} JSON Export ===\n{json}", device.Id, json);
await _collector.CollectAsync(device, result, token);
}
}
}